package com.atguigu.flink.state.keyedstate;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
 * Created by Smexy on 2023/11/17

 统计每种传感器每种水位值出现的次数。
 */
public class Demo3_MapState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new WaterSensorMapFunction())
                   //按照id分组
                    .keyBy(WaterSensor::getId)
                    .process(new KeyedProcessFunction<String, WaterSensor, String>()
                    {


                        private MapState<Integer, Integer> map;

                        //在Task被创建后，从备份中恢复
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            map = getRuntimeContext().getMapState(new MapStateDescriptor<>("map", Integer.class, Integer.class));
                        }

                        @Override
                        public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {

                            //有可能为null
                            Integer lastCount = map.get(value.getVc());

                            if (lastCount == null){
                                lastCount = 0;
                            }

                            //更新状态
                            map.put(value.getVc(),lastCount + 1);


                            out.collect(ctx.getCurrentKey() +":"+map.entries());


                        }
                    })
                    .print();
        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }
}
